package com.sn.miaosha.backdoor.util;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.ons.model.v20190214.OnsConsumerGetConnectionRequest;
import com.aliyuncs.ons.model.v20190214.OnsConsumerGetConnectionResponse;
import com.aliyuncs.ons.model.v20190214.OnsMessageGetByMsgIdRequest;
import com.aliyuncs.ons.model.v20190214.OnsMessageGetByMsgIdResponse;
import com.aliyuncs.ons.model.v20190214.OnsMessagePushRequest;
import com.aliyuncs.ons.model.v20190214.OnsTraceGetResultRequest;
import com.aliyuncs.ons.model.v20190214.OnsTraceGetResultResponse;
import com.aliyuncs.ons.model.v20190214.OnsTraceQueryByMsgIdRequest;
import com.aliyuncs.ons.model.v20190214.OnsTraceQueryByMsgIdResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.ncarzone.framework.insight.common.utils.JsonUtils;
import com.sn.miaosha.backdoor.entity.MessagePropertyRo;
import com.sn.miaosha.backdoor.entity.OnsRestMessageDo;
import com.sn.miaosha.backdoor.entity.TestResult;
import com.sn.miaosha.backdoor.httprequest.HttpRequest;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class OnsUtil {

    private static Logger log = LoggerFactory.getLogger(OnsUtil.class);


    private static String region_dev = "cn-shanghai";
    private static String instanceId_dev = "MQ_INST_1846819814603408_BXig0x6A";
    private static String accessKeyID_dev = "LTAIENEMp45wZoNP";
    private static String accessKeySecret_dev = "vHuxbRPp4lJTfvktPBH1TxnZRuhZaE";
    private static String namesAddr_dev = "http://MQ_INST_1846819814603408_BXig0x6A.mq-internet-access.mq-internet.aliyuncs.com:80";
    private static String httpEndpoint_dev = "http://1846819814603408.mqrest.cn-shanghai.aliyuncs.com";


    private static String region_test = "cn-shanghai";
    private static String instanceId_test = "MQ_INST_1846819814603408_BXig0x6A";
    private static String accessKeyID_test = "LTAIENEMp45wZoNP";
    private static String accessKeySecret_test = "vHuxbRPp4lJTfvktPBH1TxnZRuhZaE";
    private static String namesAddr_test = "http://MQ_INST_1846819814603408_BXig0x6A.cn-shanghai.mq-internal.aliyuncs.com:8080";
    private static String httpEndpoint_test = "http://1846819814603408.mqrest.cn-shanghai.aliyuncs.com";


    private static String region_pre = "cn-shanghai";
    private static String instanceId_pre = "MQ_INST_1846819814603408_BXig05SH";
    private static String accessKeyID_pre = "LTAI4FsrdV82CpTBApbUL4tw";
    private static String accessKeySecret_pre = "8tGCLn7BRwCUvNEcKvviZUGObko9SV";
    private static String namesAddr_pre = "http://MQ_INST_1846819814603408_BXig05SH.cn-shanghai.mq-internal.aliyuncs.com:8080";
    private static String httpEndpoint_pre = "http://1846819814603408.mqrest.cn-shanghai.aliyuncs.com";


    private static String region_prod = "cn-shanghai";
    private static String instanceId_prod = "MQ_INST_1846819814603408_BbGDtx5U";
    private static String accessKeyID_prod = "LTAIVotGSTNFVZ5T";
    private static String accessKeySecret_prod = "zDMhatUwdkv6FhMMD77G7Gi2VeYUQC";
    private static String namesAddr_prod = "http://MQ_INST_1846819814603408_BbGDtx5U.cn-shanghai.mq-internal.aliyuncs.com:8080";
    private static String httpEndpoint_prod = "http://1846819814603408.mqrest.cn-shanghai.aliyuncs.com";

    private static final String REST_ONS_URL = "https://pre-i0-nczgateway.carzone360.com/gateway/i0/cloudplatform/api/mq/msg/by-key?region_id=cn-shanghai&instance_id=MQ_INST_1846819814603408_BbGDtx5U&topic=";


    public static String queryMessageByMsgId(Integer envType, String topic, String msgId) {
        OnsMessageGetByMsgIdRequest onsMessageGetByMsgIdRequest = new OnsMessageGetByMsgIdRequest();
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        try {
            OnsMessageGetByMsgIdRequest messageGetByMsgIdRequest = new OnsMessageGetByMsgIdRequest();
            messageGetByMsgIdRequest.setInstanceId(onsConfig.instanceId);
            messageGetByMsgIdRequest.setMsgId(msgId);
            messageGetByMsgIdRequest.setTopic(topic);
            IClientProfile profile = DefaultProfile.getProfile(onsConfig.region, onsConfig.accessKeyID, onsConfig.accessKeySecret);
            IAcsClient iAcsClient = new DefaultAcsClient(profile);
            OnsMessageGetByMsgIdResponse onsMessageGetByMsgIdResponse = iAcsClient.getAcsResponse(onsMessageGetByMsgIdRequest);
            OnsMessageGetByMsgIdResponse.Data data = onsMessageGetByMsgIdResponse.getData();
            return JsonUtil.toJSONString(data);
        } catch (ClientException e) {
            e.printStackTrace();
        }
        return null;
    }


    /**
     * 发送消息(无环境标识)
     *
     * @param topic
     * @param tag
     * @param key
     * @param messageStr
     * @Param producer
     */
    public static String sendMessage(Integer envType, String topic, String tag, String key, String messageStr) {
        return sendMessage(envType, null, topic, tag, key, messageStr);
    }

    /**
     * 发送消息(带环境标识)
     *
     * @param topic
     * @param tag
     * @param key
     * @param messageStr
     * @Param producer
     * @Param
     */
    public static String sendMessage(Integer envType, String isoEnv, String topic, String tag, String key, String messageStr) {
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名（此处以公共云生产环境为例）
                onsConfig.httpEndpoint,
                // AccessKey 阿里云身份验证，在阿里云服务器管理控制台创建
                onsConfig.accessKeyID,
                // SecretKey 阿里云身份验证，在阿里云服务器管理控制台创建
                onsConfig.accessKeySecret
        );
        try {
            // 获取Topic的生产者
            MQProducer producer = mqClient.getProducer(onsConfig.instanceId, topic);
            TopicMessage pubMsg = new TopicMessage();
            pubMsg.setMessageKey(key);
            pubMsg.setMessageTag(tag);
            pubMsg.setMessageBody(messageStr);
            // 设置属性
            if (null != isoEnv && isoEnv.length() > 0) {
                pubMsg.getProperties().put("environment_coordinate", isoEnv);
            }
            TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
            String messageId = pubResultMsg.getMessageId();
            return messageId;
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        } finally {
            mqClient.close();
        }
        return null;
    }

    /**
     * 消费验证
     *
     * @param envType
     * @param msgId
     * @param topic
     * @param groupId
     */
    public static void consumeValidate(Integer envType, String msgId, String topic, String groupId) throws ClientException {
        consumeValidate(envType, msgId, topic, groupId, null);
    }

    /**
     * 指定IP消费验证
     *
     * @param envType
     * @param msgId
     * @param topic
     * @param groupId
     */
    public static void consumeValidate(Integer envType, String msgId, String topic, String groupId, String ip) throws ClientException {
        String client = getClient(envType, groupId, ip);
        if (StringUtils.isEmpty(client)) {
            throw new RuntimeException("获取client失败");
        }
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        OnsMessagePushRequest onsMessagePushRequest = new OnsMessagePushRequest();
        onsMessagePushRequest.setMsgId(msgId);
        onsMessagePushRequest.setTopic(topic);
        onsMessagePushRequest.setGroupId(groupId);
        onsMessagePushRequest.setInstanceId(onsConfig.instanceId);
        onsMessagePushRequest.setClientId(client);
        IClientProfile profile = DefaultProfile.getProfile(onsConfig.region, onsConfig.accessKeyID, onsConfig.accessKeySecret);
        IAcsClient iAcsClient = new DefaultAcsClient(profile);
        iAcsClient.getAcsResponse(onsMessagePushRequest);
    }

    /**
     * 消息轨迹查询
     *
     * @param envType
     * @param msgId
     */
    public static String onsTraceQuery(Integer envType, String msgId, String topic, Long beginTime, Long endTime) {
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        try {
            OnsTraceQueryByMsgIdRequest onsTraceQueryByMsgIdRequest = new OnsTraceQueryByMsgIdRequest();
            onsTraceQueryByMsgIdRequest.setInstanceId(onsConfig.instanceId);
            onsTraceQueryByMsgIdRequest.setMsgId(msgId);
            onsTraceQueryByMsgIdRequest.setTopic(topic);
            Date date = new Date();
            if (null == beginTime) {
                beginTime = DateUtil.beginOfDay(date).getTime();
            }
            if (null == endTime) {
                endTime = DateUtil.endOfDay(date).getTime();
            }
            onsTraceQueryByMsgIdRequest.setBeginTime(beginTime);
            onsTraceQueryByMsgIdRequest.setEndTime(endTime);
            IClientProfile profile = DefaultProfile.getProfile(onsConfig.region, onsConfig.accessKeyID, onsConfig.accessKeySecret);
            IAcsClient iAcsClient = new DefaultAcsClient(profile);
            OnsTraceQueryByMsgIdResponse onsTraceQueryByMsgIdResponse = iAcsClient.getAcsResponse(onsTraceQueryByMsgIdRequest);
            String queryId = onsTraceQueryByMsgIdResponse.getQueryId();
            return queryId;
        } catch (ClientException e) {
            e.printStackTrace();
        }
        return null;
    }


    public static String onsTraceQueryResult(Integer envType, String queryId) {
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        try {
            IClientProfile profile = DefaultProfile.getProfile(onsConfig.region, onsConfig.accessKeyID, onsConfig.accessKeySecret);
            IAcsClient iAcsClient = new DefaultAcsClient(profile);
            OnsTraceGetResultRequest onsTraceGetResultRequest = new OnsTraceGetResultRequest();
            onsTraceGetResultRequest.setQueryId(queryId);
            OnsTraceGetResultResponse onsTraceGetResultResponse = iAcsClient.getAcsResponse(onsTraceGetResultRequest);
            List<OnsTraceGetResultResponse.TraceData.TraceMapDo> traceList = onsTraceGetResultResponse.getTraceData().getTraceList();
            return JsonUtil.toJSONString(traceList);
        } catch (ClientException e) {
            e.printStackTrace();
        }
        return null;
    }


    private static String getClient(Integer envType, String gid, String ip) throws ClientException {
        OnsConfig onsConfig = getOnsConfig(envType);
        if (Objects.isNull(onsConfig)) {
            throw new RuntimeException("获取配置异常");
        }
        OnsConsumerGetConnectionRequest onsConsumerGetConnection = new OnsConsumerGetConnectionRequest();
        onsConsumerGetConnection.setActionName("OnsConsumerGetConnection");
        onsConsumerGetConnection.setGroupId(gid);
        onsConsumerGetConnection.setInstanceId(onsConfig.instanceId);
        IClientProfile profile = DefaultProfile.getProfile(onsConfig.region, onsConfig.accessKeyID, onsConfig.accessKeySecret);
        IAcsClient client = new DefaultAcsClient(profile);
        OnsConsumerGetConnectionResponse acsResponse = client.getAcsResponse(onsConsumerGetConnection);
        List<OnsConsumerGetConnectionResponse.Data.ConnectionDo> connectionList = acsResponse.getData().getConnectionList();
        if (CollectionUtils.isNotEmpty(connectionList)) {
            if (StrUtil.isNotEmpty(ip)) {
                Optional<OnsConsumerGetConnectionResponse.Data.ConnectionDo> first =
                        connectionList.stream().filter(connectionDo -> ip.equals(connectionDo.getClientAddr())).findFirst();
                if (first.isPresent()) {
                    return first.get().getClientId();
                }
                return null;
            } else {
                return connectionList.get(0).getClientId();
            }
        }
        return null;
    }


    public static OnsConfig getOnsConfig(int envType) {
        switch (envType) {
            case 1:
                return new OnsConfig(region_dev, instanceId_dev, accessKeyID_dev, accessKeySecret_dev, namesAddr_dev, httpEndpoint_dev);
            case 2:
                return new OnsConfig(region_test, instanceId_test, accessKeyID_test, accessKeySecret_test, namesAddr_test, httpEndpoint_test);
            case 3:
                return new OnsConfig(region_pre, instanceId_pre, accessKeyID_pre, accessKeySecret_pre, namesAddr_pre, httpEndpoint_pre);
            case 4:
                return new OnsConfig(region_prod, instanceId_prod, accessKeyID_prod, accessKeySecret_prod, namesAddr_prod, httpEndpoint_prod);
        }
        return null;
    }


    static class OnsConfig {
        public String region;
        public String instanceId;
        public String accessKeyID;
        public String accessKeySecret;
        public String nameAddr;
        public String httpEndpoint;

        public OnsConfig(String region, String instanceId, String accessKeyID, String accessKeySecret, String nameAddr, String httpEndpoint) {
            this.region = region;
            this.instanceId = instanceId;
            this.accessKeyID = accessKeyID;
            this.accessKeySecret = accessKeySecret;
            this.nameAddr = nameAddr;
            this.httpEndpoint = httpEndpoint;
        }
    }


    public static List<String> getMessageIds(String topic, String tag, String orderNoList) {
        List<String> messageIds = new ArrayList<>();
        String getUrl = REST_ONS_URL + topic + "&key_word=";
        Arrays.stream(orderNoList.split(",")).forEach(orderNo -> {
            String getResult = HttpRequest.doGet(getUrl + orderNo);
            try {
                TestResult testResult = JsonUtils.parseObject(getResult, TestResult.class);
                if (null != testResult) {
                    List<OnsRestMessageDo> onsRestMessageDoList = testResult.getData().getOnsRestMessageDo();
                    if (CollectionUtils.isNotEmpty(onsRestMessageDoList)) {
                        onsRestMessageDoList.stream().filter(x -> x.getPropertyList() != null).forEach(onsRestMessageDo -> {
                            List<MessagePropertyRo> messagePropertyRoList = onsRestMessageDo.getPropertyList().getMessageProperty();
                            //获取制定tag的msgid
                            Optional<MessagePropertyRo> messagePropertyRoOptional = messagePropertyRoList.stream().filter(x ->
                                    tag.equals(x.getValue())).findFirst();
                            if (messagePropertyRoOptional.isPresent() && StringUtils.isNotBlank(onsRestMessageDo.getMsgId())) {
                                messageIds.add(onsRestMessageDo.getMsgId());
                            }
                        });
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return messageIds;
    }

}
